In [None]:
# Code source: Sebastian Curi and Andreas Krause.

# Python Notebook Commands
%matplotlib inline
%reload_ext autoreload
%load_ext autoreload
%autoreload 2

# Numerical Libraries
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.patches import Ellipse

from matplotlib import rcParams
rcParams['figure.figsize'] = (10, 5)   # Change this if figures look ugly. 
import time 
import copy 

# IPython Libraries
import IPython
import ipywidgets
from ipywidgets import interact, interactive, interact_manual


# SKLEARN Libraries
from sklearn import datasets
from sklearn.decomposition import KernelPCA
from sklearn.decomposition import PCA as LinearPCA

# TORCH Libraries
import torch 
import torch.nn as nn 
from torch.autograd import Variable

import torchvision
from torchvision import transforms
from torchvision.transforms.functional import normalize


In [None]:
def PCA(X, num_components=1, solution_method='eigval'):
    """Compute PCA of data matrix X.
    
    Parameters
    ----------
    X: np.ndarray shape [num_points x dimension]
        - It is assumed that the data has zero-mean. 
    
    num_components: int, optional.
        Number of components in low dimensional space. Default = 1.
    
    solution_method: str, optional.
        - 'eigval': eigendecomposition of empirical covariance matrix
        - 'SVD': SVD of data matrix.
        
    Returns:
    --------
    Z: np.ndarray shape [num_points x num_components]
        Low dimensional representation of X.
        
    X_proj: np.ndarray shape[num_points x dimension]
        Back projected data.
    
    W: np.ndarray [dimension x num_components]
        Projection matrix.
    
    S: np.ndarray [dimension]
        Diagonal elements of covariance matrix, for plotting. 
    """
    assert num_components <= X.shape[-1]
    if solution_method == 'eigval':
        # Calculate empirical covariance matrix.
        Sigma = X.T @ X

        # Perform eigenvalue decomposition. 
        eig_vals, eig_vecs = np.linalg.eig(Sigma)
        idx = np.argsort(eig_vals)[::-1]
        eig_vals = eig_vals[idx]
        eig_vecs = eig_vecs[:, idx]
        
        # Get principal directions.
        W = eig_vecs[:, :num_components]
        
        # Calculate ellipse diagonals (only for plotting).
        S = np.sqrt(eig_vals) / np.sqrt(X.shape[0])
    
    elif solution_method == 'SVD':
        # Perform SVD factorization of matrix X.
        U, S, V = np.linalg.svd(X) 
        
        # Get principal directions.
        W = V[:num_components].T
        
        # Calculate ellipse diagonals (only for plotting).
        S = S / np.sqrt(X.shape[0])

    # Project to low-dimensional space
    Z = X @ W
    
    # Project back to high-demensional space.
    X_proj = (Z @ W.T)
    
    
    
    return Z, X_proj, W, S

def get_dataset(dataset, n_samples):
    if dataset == 'blobs':
        X, Y = datasets.make_blobs(n_samples=n_samples, random_state=4)
    elif dataset == 'circles':
        X, Y = datasets.make_circles(n_samples=n_samples, factor=.3, noise=.05)
    elif dataset == 'moons':
        X, Y = datasets.make_moons(n_samples=n_samples, noise=.05)
    elif dataset == 'no structure':
        X, Y = np.random.rand(n_samples, 2), None 
    elif dataset == 'anisotropic':
        X, Y = datasets.make_blobs(n_samples=n_samples, random_state=170)
        transformation = [[0.6, -0.6], [-0.4, 0.8]]
        X = np.dot(X, transformation)
    elif dataset == 'varied variance':
        X, Y = datasets.make_blobs(n_samples=n_samples, cluster_std=[1.0, 2.5, 0.5], random_state=170)
    return X, Y

# Linear PCA

In [None]:
rcParams['figure.figsize'] = (15, 8)   # Change this if figures look ugly. 

def plot_data(dataset, plots):
    # Generate Data
    np.random.seed(0)
    X, Y = get_dataset(dataset, n_samples=400)
        
    # Zero Mean
    X = X - np.mean(X, axis=0)
    min_X, max_X = np.min(X, axis=0), np.max(X, axis=0)

    Z, X_proj, W, S = PCA(X)
    
    # PLOT Data
    ax = plt.gca()

#     if 'Data' in plots:
    ax.plot(X[:, 0], X[:, 1], '*b')
    
    if 'Projected Data' in plots:
        ax.plot(X_proj[:, 0], X_proj[:, 1], '*r')
        
        
    if 'Covariance Ellipse' in plots:
        w, h = 3.5 * S
        theta = np.degrees(np.arctan2(*W[::-1])) 

        color='r'
        ell = Ellipse((0, 0), width=w, height=h, angle=theta, facecolor='r', alpha=0.2, lw=0)
        edge = Ellipse((0, 0), width=w, height=h, angle=theta, facecolor='none', edgecolor='r', lw=2)
        
        ax.plot(0, 0, 'xr')
        ax.add_artist(ell)
        ax.add_artist(edge)

    if 'Projection Line' in plots:
        x = np.linspace(min_X[0], max_X[0], 100)
        y = x * W[1, 0] / W[0, 0]
        ax.plot(x, y, '--r')
    
    # Make plots pretty
    ax.axis('equal')

    ax.set_xlim([min_X[0], max_X[0]])
    ax.set_ylim([min_X[1], max_X[1]])

dataset_widget = ipywidgets.Dropdown(
    value='blobs', 
    options=['blobs', 'circles', 'moons', 'anisotropic', 'varied variance', 'no structure'], 
    description='Dataset:',  style={'description_width': 'initial'}, continuous_update=False)

plot_widget = ipywidgets.SelectMultiple(
    options=['Projected Data', 'Covariance Ellipse', 'Projection Line'],
    value=[],
    rows=4,
    description='Plot',
    disabled=False
)

interact(plot_data, dataset=dataset_widget, plots=plot_widget);

# Kernel PCA

In [None]:
rcParams['figure.figsize'] = (10, 5)   # Change this if figures look ugly. 
rcParams['font.size'] = 16   # Change this if figures look ugly. 

import sklearn.datasets
def plot_data(dataset, kernel, bandwidth, degree):
    # Generate Data
    np.random.seed(0)
    X, Y = get_dataset(dataset, n_samples=400)
    # Zero Mean
    X = X - np.mean(X, axis=0)
    min_X, max_X = np.min(X, axis=0), np.max(X, axis=0)
    
    kpca = KernelPCA(kernel=kernel, gamma=1./bandwidth, degree=degree, n_components=3)
    X_pca = kpca.fit_transform(X)
    min_X_, max_X_ = np.min(X_pca, axis=0), np.max(X_pca, axis=0)

    fig, ax = plt.subplots(2, 2, figsize=(15, 10))
    ax[0, 0].set_title("Original Data")
    ax[0, 0].scatter(X[:, 0], X[:, 1], c=Y);
    
    # Make plots pretty
    ax[0, 0].axis('equal')
    ax[0, 0].set_xlim([min_X[0], max_X[0]])
    ax[0, 0].set_ylim([min_X[1], max_X[1]])

    for p in range(1, 4):
        if p == 1:
            i, j = 0, 1
        elif p == 2:
            i, j = 0, 2
        elif p == 3:
            i, j = 1, 2
    
        ax[p // 2, p % 2].set_title(f"PCA Space Feature {i + 1} vs Feature {j + 1}")
        ax[p // 2, p % 2].scatter(X_pca[:, i], X_pca[:, j], c=Y);

        ax[p // 2, p % 2].axis('equal')
        ax[p // 2, p % 2].set_xlim([min_X_[i] - 0.1, max_X_[i] + 0.1])
        ax[p // 2, p % 2].set_ylim([min_X_[j] + 0.1, max_X_[j] - 0.1])
    

dataset_widget = ipywidgets.Dropdown(
    value='blobs', 
    options=['blobs', 'circles', 'moons', 'anisotropic', 'varied variance', 'no structure'], 
    description='Dataset:',  style={'description_width': 'initial'}, continuous_update=False)

kernel_widget = ipywidgets.Dropdown(
    value='rbf', 
    options=['rbf', 'poly', 'linear'], 
    description='Kernel:',  style={'description_width': 'initial'}, continuous_update=False)

num_degree_widget = ipywidgets.IntSlider(
    value=2, min=2, max=10,
    description='Degree',
    disabled=False, continuous_update=False
)

bandwidth_widget = ipywidgets.FloatLogSlider(
    value=0.1, min=-2, max=0.,
    description='Bandwidth',
    disabled=False, continuous_update=False
)

interact(plot_data, dataset=dataset_widget, kernel=kernel_widget, 
         bandwidth=bandwidth_widget, degree=num_degree_widget);

# Autoencoders

In [None]:
class View(nn.Module):
    def __init__(self, output_size):
        super().__init__()
        self.output_size = output_size
        
    def forward(self, x):
        return x.view(-1, self.output_size)

class PCAAutoencoder(nn.Module):
    """Linear Autoencoder = PCA Size."""
    def __init__(self, code_size):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            View(28 * 28),
            nn.Linear(28 * 28, code_size)
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(code_size, 28 * 28),
            nn.Tanh()
        )
    
    def forward(self, x):
        z = self.encoder(x)
        x_back = self.decoder(z)
        return x_back.reshape(-1, 1, 28, 28)

    
class Autoencoder(nn.Module):
    """The code is an vector with `code_size' entries."""
    def __init__(self, code_size):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            View(28 * 28),
            nn.Linear(28 * 28, 128), nn.ReLU(True),
            nn.Linear(128, 64), nn.ReLU(True), 
            nn.Linear(64, code_size),
#             torch.quantization.FakeQuantize()
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(code_size, 64), nn.ReLU(True),
            nn.Linear(64, 128), nn.ReLU(True), 
            nn.Linear(128, 28 * 28), nn.Tanh())
    
    def forward(self, x):
        z = self.encoder(x)
        x_back = self.decoder(z)
        return x_back.reshape(-1, 1, 28, 28)
        
    
class ConvAutoencoder(nn.Module):
    """The code is an 2 x 2 image with code_size channels."""
    def __init__(self, code_size):
        super().__init__()
        # Encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=3, padding=1),  # b, 16, 10, 10
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=2),  # b, 16, 5, 5
            nn.Conv2d(in_channels=16, out_channels=code_size, kernel_size=3, stride=2, padding=1),  # b, code_size, 3, 3
            nn.ReLU(True),
            nn.MaxPool2d(2, stride=1),  # b, code_size, 2, 2
#             torch.quantization.FakeQuantize()
        )
        # Decoder
        self.decoder = nn.Sequential(
            nn.ConvTranspose2d(code_size, 16, 3, stride=2),  # b, 16, 5, 5
            nn.ReLU(True),
            nn.ConvTranspose2d(16, 8, 5, stride=3, padding=1),  # b, 8, 15, 15
            nn.ReLU(True),
            nn.ConvTranspose2d(8, 1, 2, stride=2, padding=1),  # b, 1, 28, 28
            nn.Tanh()
        )
        
    def forward(self, x):
        z = self.encoder(x)
        x = self.decoder(z)
        return x


In [None]:
rcParams['figure.figsize'] = (10, 5)   # Change this if figures look ugly. 
rcParams['font.size'] = 16   # Change this if figures look ugly. 

from torch.optim.rmsprop import RMSprop
distance = nn.MSELoss()  
num_plots = 12
batch_size = 128

t2pil = transforms.ToPILImage()
tranforms_ = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5, ))])
train_set = torchvision.datasets.MNIST('../data', train=True, download=True, transform=tranforms_)
test_set = torchvision.datasets.MNIST('../data', train=False, download=True, transform=tranforms_)
train_loader = torch.utils.data.DataLoader(train_set, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=batch_size, shuffle=False)

def run_training(autoencoder, optimizer, code_size):
    losses = []
    num_epochs = 1
    try:
        if autoencoder == 'AutoEncoder':
            model = Autoencoder(code_size=code_size)
            lr = 0.5
            num_epochs = 1
            
        elif autoencoder == 'PCA':
            num_epochs = 1
            data = normalize(train_set.data.float() / 255., (0.5,), (0.5,))
            X = data.reshape(-1, 28*28)
            model = LinearPCA(n_components=code_size)
            model.fit((X).numpy())
            
        elif autoencoder == 'ConvAutoEncoder':
            model = Autoencoder(code_size=code_size)
            lr = 0.5
            num_epochs = 1
            
        elif autoencoder == 'Mini-Batch PCA':
            model = PCAAutoencoder(code_size=code_size)
            lr = 0.1
            num_epochs = 3

        if not isinstance(model, LinearPCA):
            if optimizer == 'Adam':
                optimizer = RMSprop(model.parameters(), lr=1e-3, weight_decay=1e-5)
            elif optimizer == 'RMSprop':
                optimizer = RMSprop(model.parameters(), lr=1e-3, weight_decay=1e-5)
            elif optimizer == 'SGD':  
                optimizer = torch.optim.SGD(model.parameters(), lr=lr, momentum=0.9, weight_decay=1e-5)

        for epoch in range(num_epochs):
            for i, (img, label) in enumerate(train_loader):
                if isinstance(model, LinearPCA):
                    img = X[np.random.choice(X.shape[0], batch_size)]
                    code = model.transform(img)
                    output = torch.tensor(model.inverse_transform(code))

                    img = img.reshape(-1, 1, 28, 28)
                    output = output.reshape(-1, 1, 28, 28).clamp(-1, 1)
                    
                else:
                    output = model(img)
                loss = distance(output, img)

                if not isinstance(model, LinearPCA):
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()
                
                output = output.clamp(-1, 1)
                reshaped_img = torch.cat(tuple(img), dim=-1)[..., :28 * num_plots]
                reshaped_out =  torch.cat(tuple(output), dim=-1)[..., :28 * num_plots]
                
                if isinstance(model, LinearPCA):
                    code = torch.tensor(code[:num_plots]).reshape(num_plots, -1)
                else:
                    code = model.encoder(img)[:num_plots].reshape(num_plots, -1)

                code = torch.cat((code, torch.zeros(code.shape[0], 10 * 28 - code.shape[1])), dim=-1)  # Zero Pad
                code = torch.cat(tuple(code), dim=0)
                code = code.reshape(-1, 28).reshape(-1, 10, 28)
                code = torch.cat(tuple(code), dim=-1).unsqueeze(0)
                code = torch.cat((torch.zeros_like(code), code), dim=1)

                losses.append(loss.item())
                if i % 10 == 0:
                    IPython.display.clear_output(wait=True)
                    plt.close()

                    print(f"epoch [{epoch + 1}/{num_epochs}], iter {i}, loss:{loss.item():.4f}")
                    im = torch.cat((reshaped_img, reshaped_out, code), dim=-2)
                    plt.figure(figsize=(16, 8))
                    plt.imshow(t2pil(im), cmap='Greys')
                    plt.show()
                    
    except KeyboardInterrupt:
        IPython.display.clear_output(wait=True)
        plt.close()

        print(f"epoch [{epoch + 1}/{num_epochs}], iter {i}, loss:{loss.item():.4f}")
        im = torch.cat((reshaped_img, reshaped_out, code), dim=-2)
        plt.figure(figsize=(16, 8))
        plt.imshow(t2pil(im), cmap='Greys')
        plt.show()

    plt.figure(figsize=(16, 8))
    plt.plot(losses)
    plt.xlabel('Iteration')
    plt.ylabel('Training Loss')
                                
    

code_size_widget = ipywidgets.IntSlider(
    value=16, min=1, max=64,
    description='Components',
    disabled=False, continuous_update=False
)
interact_manual(run_training, 
                autoencoder=['AutoEncoder', 'ConvAutoEncoder', 'PCA', 'Mini-Batch PCA'], 
                optimizer=['Adam', 'RMSprop', 'SGD'],
                code_size=code_size_widget);
